programing

DataFrame의 파티셔닝을 정의하는 방법은 무엇입니까?

mailnote 2023. 11. 7. 21:03
반응형

DataFrame의 파티셔닝을 정의하는 방법은 무엇입니까?

Spark 1.4.0에서 Spark SQL과 DataFrame을 사용하기 시작했습니다.Scala에서 DataFrame에 사용자 정의 파티셔닝을 정의하고 싶지만 이를 수행하는 방법은 알 수 없습니다.

제가 작업하고 있는 데이터 테이블 중 하나에는 다음 예시에 해당하는 실리마르(silimar)의 거래 목록이 포함되어 있습니다.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

적어도 처음에는 대부분의 계산이 계정 내의 거래 사이에서 이루어질 것입니다.그래서 계정에 대한 모든 트랜잭션이 동일한 Spark 파티션에 있도록 데이터를 분할하고 싶습니다.

하지만 이걸 정의할 방법이 없습니다.DataFrame 클래스에는 'repartition(Int)'이라는 메서드가 있으며, 이 메서드에서 생성할 파티션 수를 지정할 수 있습니다.그러나 RDD에 대해 지정할 수 있는 것과 같이 DataFrame에 대해 사용자 지정 파티셔닝을 정의할 수 있는 방법이 없습니다.

소스 데이터는 Parquet에 저장됩니다.Parquet에 DataFrame을 작성할 때 분할할 열을 지정할 수 있다는 것을 확인했습니다. 그래서 Parquet에 'Account' 열로 데이터를 분할하도록 지시할 수 있을 것입니다.하지만 수백만 개의 계정이 있을 수도 있고, Parquet에 대해 제대로 이해하고 있다면, 계정마다 고유한 디렉토리가 생성될 것이기 때문에 합리적인 해결책이 아닌 것 같습니다.

계정의 모든 데이터가 동일한 파티션에 있도록 Spark가 이 DataFrame을 분할하도록 할 수 있는 방법이 있습니까?

스파크 >= 2.3.0

SPARK-22614는 레인지 파티션을 노출합니다.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389Data Source API v2에서 외부 포맷 파티셔닝을 제공합니다.

스파크 >= 1.6.0

Spark >= 1.6에서는 쿼리 및 캐싱을 위해 열별 파티셔닝을 사용할 수 있습니다.참조: SPARK-11410SPARK-4849 사용repartition방법:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

와는 달리RDDs스파크Dataset(포함)Dataset[Row]일명의DataFrame)에서는 현재 사용자 정의 파티셔닝을 사용할 수 없습니다.일반적으로 인공 분할 열을 생성하여 이 문제를 해결할 수는 있지만 동일한 유연성을 제공하지는 못합니다.

스파크 < 1.6.0:

당신이 할 수 있는 한 가지 일은 당신이 데이터를 생성하기 전에 입력 데이터를 미리 파티셔닝하는 것입니다.DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

부터DataFrame로부터의 창조.RDD단순한 맵 단계의 기존 파티션 레이아웃만 보존해야 함*:

assert(df.rdd.partitions == partitioned.partitions)

기존 파티션을 다시 분할할 수 있는 것과 동일한 방법DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

그래서 불가능한 것은 아닌 것 같습니다.그것이 말이 되는지 의문이 남아있습니다.저는 대부분의 경우 다음과 같이 주장할 것입니다.

  1. 재파티셔닝은 비용이 많이 드는 프로세스입니다.일반적인 시나리오에서는 대부분의 데이터를 직렬화, 셔플화 및 역직렬화해야 합니다.반면에 사전에 분할된 데이터를 통해 이익을 얻을 수 있는 작업 수는 상대적으로 적으며 내부 API가 이 속성을 활용하도록 설계되지 않은 경우에는 더욱 제한됩니다.

    • 일부 시나리오에서는 합류하지만 내부 지원이 필요할 겁니다
    • window functions calls with matching particator.위와 동일하며 단일 창 정의로 제한됩니다.이미 내부적으로 분할되어 있기 때문에 사전 분할이 중복될 수 있습니다.
    • 와의 단순 집계GROUP BY- 임시 버퍼**의 메모리 풋프린트를 줄일 수는 있지만 전체 비용은 훨씬 높습니다.에 상당하는 정도의groupByKey.mapValues(_.reduce)(현재 동작) vsreduceByKey(사전 partition링).실제로는 유용하지 않을 것 같습니다.
    • 데이터 압축:SqlContext.cacheTable. 런 길이 인코딩을 사용하는 것처럼 보이므로 적용합니다.OrderedRDDFunctions.repartitionAndSortWithinPartitions압축비를 향상시킬 수 있습니다.
  2. 성능은 키의 분포에 크게 의존합니다.비대칭인 경우 리소스 활용도가 낮아집니다.최악의 경우에는 일을 끝내는 것이 불가능할 것입니다.

  3. 상위 레벨 선언 API를 사용하는 핵심은 하위 레벨 구현 세부 정보로부터 자신을 격리하는 것입니다.@dwysakowicz @RomiKuntsman이 이미 언급한 바와 같이 최적화는 Catalyst Optimizer의 작업입니다.그것은 꽤나 정교한 짐승이고 나는 당신이 그것의 내부로 더 깊이 들어가지 않고도 그것을 쉽게 개선할 수 있을지 정말 의심스럽습니다.

관련개념

JDBC 소스로 파티셔닝:

JDBC 데이터 원본은 인수를 지원합니다.다음과 같이 사용할 수 있습니다.

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

술어당 하나의 JDBC 파티션을 만듭니다.개별 술어를 사용하여 생성된 집합이 서로소가 아닌 경우 결과 테이블에 중복이 표시됩니다.

partitionBy 메소드:

스파크DataFrameWriter제공.partitionBy쓰기에 대한 데이터를 "partition"하는 데 사용할 수 있는 메서드입니다.제공된 열 집합을 사용하여 쓰기 데이터를 분리합니다.

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

이를 통해 키를 기반으로 한 쿼리에 대해 읽기 시 술어 푸시 다운을 사용할 수 있습니다.

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

하지만 그것은 같은 것이 아닙니다.DataFrame.repartition. 특히 집계는 다음과 같습니다.

val cnts = df1.groupBy($"k").sum()

여전히 필요할 것입니다.TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

(Spark >= 2.0)의 메서드:

bucketBy와 유사한 응용 프로그램을 가지고 있습니다.partitionBy테이블에만 사용할 수 있습니다(saveAsTable결합을 최적화하기 위해 버킷링 정보를 사용할 수 있습니다.

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* 파티션 레이아웃은 데이터 분포만을 의미합니다.partitionedRDD에 더 이상 파티션이 없습니다.** 초기투영이 없다고 가정합니다.집합이 열의 작은 부분만 포함하는 경우에는 아무런 이득도 없을 수 있습니다.

In Spark < 1.6을 생성하면HiveContext, 아주 오래된 것이 아닌SqlContextHiveQL을 사용할 수 있습니다. DISTRIBUTE BY colX...(N개의 감속기 각각이 x의 비중첩 범위를 갖도록 함) &CLUSTER BY colX...(Distribute By 및 Sort By의 바로 가기) 예를 들어,

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

이것이 Spark DF api에 어떻게 들어맞는지 확실하지 않습니다.이러한 키워드는 일반 SqlContext에서 지원되지 않습니다(HiveContext를 사용하기 위해 하이브 메타 스토어가 필요하지 않음).

편집: Spark 1.6+는 이제 네이티브 DataFrame API에 이 기능이 있습니다.

그래서 어떤 대답으로 시작하자면 :) - 당신은 안 됩니다.

저는 전문가는 아니지만 DataFrame에 대해서는 rdd와 동일하지 않으며 DataFrame에는 Partitioner와 같은 것이 없습니다.

일반적으로 DataFrame의 아이디어는 이러한 문제 자체를 처리하는 또 다른 수준의 추상화를 제공하는 것입니다.DataFrame에 대한 쿼리는 논리 계획으로 변환되고 RDD에 대한 작업으로 추가로 변환됩니다.제안한 파티션은 자동으로 적용되거나 최소한 적용되어야 합니다.

SparkSQL이 최적의 작업을 제공할 것이라고 신뢰하지 않을 경우 의견에 제시된 대로 DataFrame을 RDD[Row]로 언제든지 변환할 수 있습니다.

다음이 반환하는 DataFrame을 사용합니다.

yourDF.orderBy(account)

명시적인 사용 방법이 없습니다.partitionByDataFrame에서는 PairRDD에서만 사용되지만 DataFrame을 정렬할 때는 LogicalPlan에 사용되며 각 Account에서 계산을 수행해야 할 때 도움이 됩니다.

계정별로 분할하고자 하는 데이터 프레임과 동일한 정확한 문제를 발견했습니다.계정에 대한 모든 트랜잭션이 동일한 Spark 파티션에 있도록 데이터를 파티션화할 경우 확장 및 성능을 원하지만 코드가 의존하지는 않습니다(사용하는 경우).mapPartitions()등), 맞지요?

저는 RDD를 이용해서 할 수 있었습니다만, 이것이 당신에게 적합한 솔루션인지는 모르겠습니다.RDD로 사용 가능한 DF가 있으면 데이터의 맞춤 재파티셔닝을 수행하도록 신청할 수 있습니다.

제가 사용한 샘플은 다음과 같습니다.

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)

언급URL : https://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-dataframe

반응형